package com.ks.mqttServer;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import lombok.extern.slf4j.Slf4j;

/**
 * @program: SpringBootDemos
 * @description:
 * @author: Kangsen
 * @create: 2022-06-23 14:13
 *
 * QOS 分为三种类型 对应不同消息确认机制:
 *      QOS0: 发下去就是发下去了 不管了
 *
 *      QOS1: 发下去之后会等待客户端的确认报文 PUBACK；等确认客户端收到消息后悔将消息从消息队列中删除；
 *      否则就会进行超时重发，客户端收到消息就下发  有可能会接收到重复消息 比如收到消息但是没有确认
 *      重发的时候会再次下发 这样客户端会重复收到相同的消息
 *
 *      QOS2: 发下去之后会进行多次确认   S -> C    C收到消息 返回PUBrec给S  S收到PUBrec会给C返回PUBrel
 *      C接收到Pubrel后会给S返回 PUBComp
 *
 **/
@Slf4j
public class MqttServer {

    //public void startServer(InetSocketAddress socketAddress) {
    public void startServer(int port) {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workGroup = new NioEventLoopGroup(20);
        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childOption(ChannelOption.SO_REUSEADDR, true)
                    .childHandler(
                            new ChannelInitializer<SocketChannel>() {
                                @Override
                                protected void initChannel(SocketChannel socketChannel) throws Exception {
                                    socketChannel.pipeline().addLast(new MqttDecoder(1024 * 1024));
                                    socketChannel.pipeline().addLast(MqttEncoder.INSTANCE);
                                    socketChannel.pipeline().addLast(new MqttServerHandler());
                                }
                            }
                    );
            /*ChannelFuture future = serverBootstrap.bind(socketAddress).sync();
            log.info("mqtt server started port: {}",socketAddress);*/
            ChannelFuture future = serverBootstrap.bind(port).sync();
            log.info("mqtt server started port: {}",port);
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
        }
    }
}
